package com.itmck.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import com.itmck.dao.DataSwMbqcdssBatchInsertMapper;
import com.itmck.dto.DataSwMbqcdssDO;
import com.itmck.dto.DataSwMbqcdssImportExcelVO;
import com.itmck.pojo.DataExcelImportRespVO;
import com.itmck.service.FunctionDatasourceService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;


@Slf4j
@Service
public class FunctionDatasourceServiceImpl implements FunctionDatasourceService {

    @Resource
    private DataSwMbqcdssBatchInsertMapper dataSwMbqcdssBatchInsertMapper;

    @Override
    @Transactional(rollbackFor = Exception.class) // 添加事务，异常则回滚所有导入
    public DataExcelImportRespVO createFunctionDatasourceAndSwMbqcdss(MultipartFile file) throws IOException, ExecutionException, InterruptedException {

        //开始计时
        TimeInterval timer = DateUtil.timer();
        DataExcelImportRespVO respVO = readExcelAndSaveAsync(DataSwMbqcdssImportExcelVO.class,
                file,
                data -> {
                    DataSwMbqcdssDO newDo = new DataSwMbqcdssDO();
                    //数据预处理，因为目前的批处理方法不会再自动填充数据，所以这里手动填充
                    return newDo;
                },
                dataSwMbqcdssBatchInsertMapper::insertBatchSomeColumn);
        //结束计时
        long interval = timer.interval();
        log.info("导入数据共花费：{}s", interval / 1000);
        return respVO;
    }

    /**
     * 异步多线程导入数据
     * 采用自定义注入mybatis-plus的SQL注入器，实现真正的BatchInsert，但是需要注意的是项目配置文件需要在jdbc的url后面加上rewriteBatchedStatements=true
     *
     * @param head       Excel导入实体类的class
     * @param file       要导入的Excel文件
     * @param function   数据处理函数，对数据加工
     * @param dbFunction 数据库操作
     * @param <T>        Excel导入实体类   例如DataSwMbqcdssImportExcelVO
     * @param <R>        数据库实体类  例如DataSwMbqcdssDO
     * @return 导入结果
     * @throws IOException
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private <T, R> DataExcelImportRespVO readExcelAndSaveAsync(Class<T> head,
                                                               MultipartFile file,
                                                               Function<T, R> function,
                                                               Function<List<R>, Integer> dbFunction
    ) throws IOException, ExecutionException, InterruptedException {
        Integer successCount = 0;
        Integer failCount = 0;
        //存储异步线程的执行结果
        Collection<Future<int[]>> futures = new ArrayList<>();

//        EasyExcel.read(file.getInputStream(),
//                        head,
//                        new BatchReadListener<T>(
//                                dataList -> {
//                                    //转换DO，并设置数据源id
//                                    List<R> list = dataList.parallelStream().map(function).collect(Collectors.toList());
//                                    //异步批量插入
//                                    futures.add(this.saveAsyncBatch(list, dbFunction));
//                                })
//                )
//                .sheet()
//                .doRead();
        //等待异步线程执行完毕
        for (Future<int[]> future : futures) {
            int[] counts = future.get();
            successCount += counts[0];
            failCount += counts[1];
        }
        log.info("存储成功总数据量：{},存储失败总数据量:{}", successCount, failCount);
        DataExcelImportRespVO respVO = DataExcelImportRespVO.builder().successCount(successCount).failCount(failCount).build();
        return respVO;
    }

    /**
     * 批量插入
     *
     * @param list       要分批处理的数据
     * @param dbFunction 数据库操作的方法
     * @param <T>        数据库实体类
     * @return 返回处理结果
     */
    @Async
    public <T> Future<int[]> saveAsyncBatch(List<T> list, Function<List<T>, Integer> dbFunction) {
        int size = list.size();
        int[] result = new int[2];
        log.info("saveAsyncBatch当前数据分片大小 size:{}", size);
        try {
            if (dbFunction.apply(list) > 0) {
                result[0] = size;
                log.info("{} 分片存储数据成功,数据量：{}", Thread.currentThread().getName(), size);
            } else {
                result[1] = size;
                log.info("{} 分片存储数据失败：{}", Thread.currentThread().getName(), size);
            }
        } catch (Exception e) {
            result[1] = size;
            log.error("{} 分片存储数据出现异常，{}", Thread.currentThread().getName(), e.getMessage());
        }

        return new AsyncResult<int[]>(result);
    }

}
